先簡單說明一下websocket是什麼,Socket就是一堆洞的意思,看下圖有Socket的圖,不過這是CPU Socket,CPU和插槽長像下面這樣,以前Intel和AMD還可以換著插,現在都不行了,只能各插各的,形狀要符合才能插:
但我們現在要講的不是這個,在軟體開發一般單講Socket是在講網路的洞,什麼是網路的洞,下面這張圖表達的很清楚,想像一條看不見的 手(X) 線(O),透過網路連接2台不同的主機,所以要有洞才能讓這條線插,就像我們每天都在插USB充電一樣,而我們的應用程式也要指到這個對應的洞,才不會傳錯資料,錯把馮京當馬涼,把要傳給女友的訊息傳給男友就糗了(咦?)。
所以Socket是屬於比較偏底層的東西,而websocket(ws)呢,他是在高層裡的底層(? 阿鬼 你還是說中文吧),就像我們之前在用WebAssembly不也是在Web裡跑Assembly語言嗎(雖然現在wasm也可以用在非web的環境,而且還慢慢的紅起來),websocket只是在web環境中用起來很像socket,實作上的細節差很多,想了解的話去看邦友的文章,而想知道wasm不在web裡怎麼用的可以去看這系列介紹。
在.NET裡有SignalR幫我們包好了,會依不同環境的相容性看情況使用ws,或是在不相容的瀏覽器使用其他協定去達到類似的結果。在Rust中,我們很多東西要自己處理,不過好在有豐富的社群和生態系支持,這些比較底層的東西大多有前輩做好了,我們只要學習如何使用就好(不過如果是要寫IoT要顧及頻寬及電力問題,可能需要知道更底層的東西來做優化)。
前言到這邊,我們看一下warp的hello_websocket,把它接進我們的router裡:
@@ Cargo.toml @@
+futures-util = { version = "0.3" }
@@ web/Cargo.toml @@
+futures-util = { workspace = true }
// web/src/routers.rs
pub fn all_routers() ->
// ... 略
ws_routers()
.or(hello)
.or(api_games)
.recover(error::handle_rejection)
.with(cors_config())
.with(warp::trace::request())
}
use futures_util::stream::StreamExt;
use futures_util::FutureExt;
pub fn ws_routers() -> impl Filter<Extract=impl Reply, Error=Rejection> + Clone {
warp::path("echo")
.and(warp::ws())
.map(|ws: warp::ws::Ws| {
ws.on_upgrade(|websocket| {
let (tx, rx) = websocket.split();
rx.forward(tx).map(|result| {
if let Err(e) = result {
tracing::info!("websocket error: {:?}", e);
}
})
})
})
}
這個範例只是很簡單的,從前端收到什麼訊息,就回應什麼訊息:
欸不是,放錯圖了,跑出來的結果是下面這個:
看不出來有沒有正確執行(?),我們用前端開 ws client試一下:
通常前端個人傾向從console中進行測試,因為可以快速得到結果,據以驗證自己的演算法對不對,直譯式語言的好處就不是不用編譯,打什麼出什麼。等試成功後再把code寫到code base裡,在MDN的文件有提到怎麼寫javascript的websocket:
var mySocket = new WebSocket("ws://localhost:3030/echo");
記得網頁要開localhost:3030,在console才不會被CORS擋掉。
看後端的畫面有收到echo的訊息:
我們試著從前端發一下訊息
這時候從前端發送資料給websocket,但沒反應,因為他是保持連線,不是request/response模式,所以需要用監聽的方式監聽後端有沒有發送訊息過來。在這裡依MDN說明去註冊一個監聽的事件,再重新傳送剛剛的訊息,就可以得到回應的結果:
mySocket.onmessage = function (e) {
console.log(e.data);
};
不過,只有資料,這時可以把範例的 .data
拿掉查看完整的訊息物件:
可以看到data只是回應裡面的一個部分,還有其他meta資訊。
測試ws可以作用後,我們到code base裡去實現吧:
我們在 api 中加 websocket client:
// app/src/api/ws_client.ts
export const wsClient = (): WebSocket => {
const url = `${import.meta.env.VITE_WS_BASE_URL}/echo`;
const ws: WebSocket = new WebSocket(url);
ws.onopen = () => {
console.log('ws open');
}
ws.onclose = (): void => {
console.log('ws close');
}
ws.onerror = (err: Event) => {
console.log('ws error', err);
}
ws.onmessage = (msg: MessageEvent<any>) => {
console.log('ws message', msg);
}
return ws;
}
其中環境變數加在 前端node.js使用的.env裡
# app/.env
VITE_WS_BASE_URL=ws://localhost:3030
使用Greet來進行概念性驗證:
<script lang="ts">
import { wsClient } from '../api/ws_client';
let message = '';
let responseMessage = '';
let ws = wsClient();
ws.onmessage = function (event) {
console.log('Message from server ', event.data);
responseMessage = event.data;
};
async function greet() {
ws.send(message);
}
</script>
<div>
<form class="row" on:submit|preventDefault={greet}>
<input id="greet-input" placeholder="Enter message" bind:value={message} />
<button type="submit">Greet</button>
</form>
<p>{responseMessage}</p>
</div>
看起來沒有問題。
ps 這裡要同時跑 web http, tauri, grpc server 3個terminal。
剛剛的websocket只是簡單echo,除了測試服務是不是還活著,好像沒什麼用途(還有用來demo XDD)。因為websocket的連線是持續活著(直到一方關閉),我們在這邊借用tokio-stream的幫忙,想像連線一直開始其實就是一種串流,一邊丟資料另一邊接到一包資料就可以進行處理。另外也需要一個state來存放,跟先前在第14篇幫tauri加state很像:
@@ Cargo.toml @@
[workspace.dependencies]
+tokio-stream = { version = "0.1" }
@@ web/Cargo.toml @@
[dependencies]
+tokio-stream = { workspace = true }
@@ web/src/lib.rs @@
+pub mod web_socket;
+pub mod app_context;
我們開一個context,存放作為整個App的State狀態管理使用
// web/src/app_context.rs
use std::{sync::Arc, collections::HashMap};
use tokio::sync::{RwLock, mpsc::UnboundedSender};
use warp::ws::Message;
#[derive(Clone)]
pub struct AppContext {
pub ws_connections:
Arc<
RwLock<
HashMap<
usize,
UnboundedSender<Message>
>
>
>,
}
impl Default for AppContext {
fn default() -> Self {
Self {
ws_connections:
Arc::new(
RwLock::new(
HashMap::new()
)
),
}
}
}
其中HaspMap
存放的就是websocket的連線,使用key/value,key為整數usize,用以辨識連線用戶端,如果有需要也可以使用其他key值,或綁定用戶資訊等。RwLock之前在第7篇有順帶提到,這裡我們只有在「用戶端建立新連線」的時候才會取號並寫入HashMap
中,其他時候調用現有的websocket連線都只是對HashMap
進行讀取,是故在這裡用Mutex相對效率不彰。
// web/src/main.rs
use web::{config, routers, app_context::AppContext};
#[tokio::main]
async fn main() {
config::init();
let _logger = Logger::builder().use_env().build();
let app_context = AppContext::default(); // 加入App狀態機
let routers = routers::all_routers(app_context.clone()); // 注入
warp::serve(routers).run(([0, 0, 0, 0], config::http_port())).await;
}
每回合的poc都是隨意寫的code,先求有再求好,確認想法ok要開始實作時,記得先整理一下程式碼,我們直接把web_socket移出為一個獨立的mod檔案。
這裡有一篇文章:我的寫法我的 Team 都看得懂,針對寫出清晰可讀的代碼有滿獨到的見解,必看。
// web/src/routers.rs
use crate::app_context::AppContext;
use crate::web_socket::ws_routers;
pub fn all_routers(ctx: AppContext) // 把 state 作為參數注入
-> impl Filter<Extract=impl Reply, Error=Rejection> + Clone {
// ...略
hello
.or(ws_routers(ctx.clone())) // 傳遞state
// 略
}
// 移除舊的ws_routes
上面把router裡的ws_routes移除,並使用引用的方式,從另一個mod web_scoket引入,以下是mod web_socket路由,內容有點長,這裡分段講述:
// web/src/web_socket.rs
use std::sync::atomic::{AtomicUsize, Ordering};
use futures_util::{SinkExt, TryFutureExt, stream::StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::{Filter, Rejection, Reply, ws::{Message, WebSocket}};
use crate::app_context::AppContext;
static NEXT_CONNECTION_ID: AtomicUsize = AtomicUsize::new(1); // 取號用
pub fn ws_routers(context: AppContext)
-> impl Filter<Extract=impl Reply, Error=Rejection> + Clone {
warp::path("echo") // web socket 路由
.and(warp::ws())
.and(warp::any().map(move || context.clone())) //注入 State
.map(|ws: warp::ws::Ws, ctx: AppContext| { // 注入 State
ws.on_upgrade(move |socket| ws_connected(socket, ctx))
}) // ↑↑ 接到連線使用我們定義的ws_connected fn處理
}
上面的map
是extrator,抽取整串pipeline裡面有的東西,可能是reqeust傳進來的,可能是我們在中間加入的(middleware),整個管線如同第13篇的鐵道開發法。
而裡面的ws.on_upgrade
,需要傳入一個Function,用來處理當有websocket連線時,而且這個Function必需回傳一個Future
物件,只要我們在fn
前面加一個async
,rust就會把我們把它傳為Future
,以下我們寫一個ws_connected
,傳入連線物件及App狀態物件,而這個ws_connected
存活期間是跟著每一條連線走的,有幾條連線就有幾個ws_connected
,裡面有一個無窮迴圈,也因此這裡強迫我們使用Future
,如此可以透過async的runtime進行資源調派,(一個連線就開一個分身處理的意思)。
實作連線ws_connected
內容架構如下:
// web/src/web_socket.rs
async fn ws_connected(ws: WebSocket, ctx: AppContext) {
let conn_id = NEXT_CONNECTION_ID
.fetch_add(1, Ordering::Relaxed); // 取號
tracing::info!("new websocket connection: {}", conn_id);
let (mut ws_tx, mut ws_rx) = ws.split(); // 取得 ws 的 tx/rx
let (tx, rx) = mpsc::unbounded_channel(); // 建立一個 mpsc 通道
let mut rx = UnboundedReceiverStream::new(rx); // mpsc 收受端
// 等待程式通知要發送訊息時的處理,透過mpsc接受訊息
tokio::task::spawn(async move { // 建立工作(分身)監聽
while let Some(message) = rx.next().await { // 當 mpsc 收到訊息時
ws_tx
.send(message) // 從 ws 發送該訊息
.unwrap_or_else(|e| {
tracing::info!("websocket send error: {}", e);
})
.await;
}
});
// 把連線id和 mpsc 的發送端,存到AppState裡
// 後續我們在程式任意地方都可以呼叫這個tx
let _ = &ctx.ws_connections.write().await.insert(conn_id, tx);
// 處理當 websocket 接收到資料的時候
while let Some(result) = ws_rx.next().await {
let msg = match result { // 解析收到的訊息
Ok(msg) => msg,
Err(e) => {
tracing::info!("websocket error(uid={}): {}", conn_id, e);
break;
}
};
// 處理訊息,本例是發送給websocket全連線端
send_all_message(conn_id, msg, &ctx).await;
}
// 處理斷線處理
disconnected(conn_id, &ctx).await;
}
以下是上面幾個處理訊息fn的實作:
// web/src/web_socket.rs
/// 發送訊息給特定id用戶端
async fn send_one_message(my_id: usize, msg: Message, ctx: &AppContext) {
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{}>: {}", my_id, msg); // 訊息帶用戶id
// 找出特定 id 的 tx,並傳送文字訊息
for (&uid, tx) in ctx.ws_connections.read().await.iter()
.filter(|(&uid, _)| uid == my_id) { // 選出特定id
if let Err(_disconnected) = tx
.send(Message::text(new_msg.clone())) {} // 發送訊息
}
}
/// 發送訊息給所有連線用戶端
async fn send_all_message(my_id: usize, msg: Message, ctx: &AppContext) {
let msg = if let Ok(s) = msg.to_str() {
s
} else {
return;
};
let new_msg = format!("<User#{}>: {}", my_id, msg); // 訊息帶用戶id
// 遍歷所有的連線端,傳送文字訊息
for (&uid, tx) in ctx.ws_connections.read().await.iter() {
if let Err(_disconnected) = tx
.send(Message::text(new_msg.clone())) {}
}
}
/// 處理websocket連線中斷
async fn disconnected(conn_id: usize, ctx: &AppContext) {
tracing::info!("disconnected conn_id: {}", conn_id);
// 從AppState移除該連線tx通道
ctx.ws_connections.write().await.remove(&conn_id);
}
這裡舉例只是簡單用一對一,一對多的傳遞,依實際境情可以另外做一群組,然後選擇哪些用戶端是同一個群組的,就可以依不同群組發送不同的訊息。
到這裡把基礎架設好了,我們來演示一個實際的應用,一般我們常見的功能都是由客戶端發動,傳遞request給後端,再由後端回應,我們這次反過來,由server端主動派送訊息給前端,以下實作:
@@ core/src/lib.rs @@
+pub mod game_message;
@@ web/Cargo.toml @@
[dependencies]
+rand = { workspace = true }
在core裡寫一個罐頭訊息工廠,會隨機挑選一則預先寫好的訊息。
// core/src/game_message.rs
use rand::Rng;
pub fn message_factory() -> String {
let messages = vec![
"我得好好策略一下。",
"等等,我差點就贏了!",
"看來我需要改變策略。",
// ...略
];
let mut rng = rand::thread_rng();
let index = rng.gen_range(0..messages.len());
messages[index].to_string()
}
在web_socket裡,新增一支程式,會隨機等待一段時間,接著發送訊息給所有現存連線的ws用戶端。
// web/src/web_socket.rs
use tokio::{time::Duration, time};
use rand::random;
pub async fn polling_message(ctx: &AppContext) {
let ctx = ctx.clone();
// 因為要一直存活才能處理,故開分身:
tokio::task::spawn(async move {
loop {
// 隨機等待1000豪秒 至10000豪秒
let secs = random::<u64>() % 9_000 + 1_000;
time::sleep(Duration::from_millis(secs)).await;
// 從訊息庫隨機取訊息並發送至用戶端
let message = my_core::game_message::message_factory();
send_all_message(0,Message::text(message), &ctx).await;
}
});
}
最後到main裡註冊我們剛剛寫的polling_message
:
use web::web_socket::polling_message; // 加這行
#[tokio::main]
async fn main() {
config::init();
let _logger = Logger::builder().use_env().build();
let app_context = AppContext::default();
polling_message(&app_context).await; // 加這行
let routers = routers::all_routers(app_context.clone());
warp::serve(routers).run(([0, 0, 0, 0], config::http_port())).await;
}
最後在前端實測一下:
可以看到在前端有正確的接受後端不定期派送的訊息,到這邊為止我們完成了後端的websocket服務,並作實作一個隨機時間自動派送訊息的功能。
本系列專案源始碼放置於 https://github.com/kenstt/demo-app